/*
Copyright 2024 The Karmada Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package runtime

import (
	"context"
	"fmt"
	"math"
	"reflect"
	"time"

	"k8s.io/client-go/informers"
	clientset "k8s.io/client-go/kubernetes"

	"github.com/karmada-io/karmada/pkg/estimator/pb"
	"github.com/karmada-io/karmada/pkg/estimator/server/framework"
	"github.com/karmada-io/karmada/pkg/estimator/server/metrics"
	schedcache "github.com/karmada-io/karmada/pkg/util/lifted/scheduler/cache"
	utilmetrics "github.com/karmada-io/karmada/pkg/util/metrics"
)

const (
	// estimator is the legacy label for replica estimation metrics.
	// Deprecated: Use estimateReplicasExtension instead. This will be removed in a future release.
	estimator                   = "Estimator"
	estimateReplicasExtension   = "estimate_replicas"
	estimateComponentsExtension = "estimate_components"
)

// frameworkImpl implements the Framework interface and is responsible for initializing and running scheduler
// plugins.
type frameworkImpl struct {
	estimateReplicasPlugins   []framework.EstimateReplicasPlugin
	estimateComponentsPlugins []framework.EstimateComponentsPlugin
	clientSet                 clientset.Interface
	informerFactory           informers.SharedInformerFactory
	parallelism               int
}

var _ framework.Framework = &frameworkImpl{}

type frameworkOptions struct {
	clientSet       clientset.Interface
	informerFactory informers.SharedInformerFactory
	parallelism     int
}

// Option for the frameworkImpl.
type Option func(*frameworkOptions)

func defaultFrameworkOptions() frameworkOptions {
	return frameworkOptions{}
}

// WithClientSet sets clientSet for the scheduling frameworkImpl.
func WithClientSet(clientSet clientset.Interface) Option {
	return func(o *frameworkOptions) {
		o.clientSet = clientSet
	}
}

// WithInformerFactory sets informer factory for the scheduling frameworkImpl.
func WithInformerFactory(informerFactory informers.SharedInformerFactory) Option {
	return func(o *frameworkOptions) {
		o.informerFactory = informerFactory
	}
}

// WithParallelism sets parallelism for the scheduling frameworkImpl.
func WithParallelism(parallelism int) Option {
	return func(o *frameworkOptions) {
		o.parallelism = parallelism
	}
}

// NewFramework creates a scheduling framework by registry.
func NewFramework(r Registry, opts ...Option) (framework.Framework, error) {
	options := defaultFrameworkOptions()
	for _, opt := range opts {
		opt(&options)
	}
	f := &frameworkImpl{
		informerFactory: options.informerFactory,
	}
	estimateReplicasPluginsList := reflect.ValueOf(&f.estimateReplicasPlugins).Elem()
	estimateReplicasType := estimateReplicasPluginsList.Type().Elem()

	estimateComponentsPluginsList := reflect.ValueOf(&f.estimateComponentsPlugins).Elem()
	estimateComponentsType := estimateComponentsPluginsList.Type().Elem()

	for name, factory := range r {
		p, err := factory(f)
		if err != nil {
			return nil, fmt.Errorf("failed to initialize plugin %q: %w", name, err)
		}
		addPluginToList(p, estimateReplicasType, &estimateReplicasPluginsList)
		addPluginToList(p, estimateComponentsType, &estimateComponentsPluginsList)
	}
	return f, nil
}

func addPluginToList(plugin framework.Plugin, pluginType reflect.Type, pluginList *reflect.Value) {
	if reflect.TypeOf(plugin).Implements(pluginType) {
		newPlugins := reflect.Append(*pluginList, reflect.ValueOf(plugin))
		pluginList.Set(newPlugins)
	}
}

// ClientSet returns a kubernetes clientset.
func (frw *frameworkImpl) ClientSet() clientset.Interface {
	return frw.clientSet
}

// SharedInformerFactory returns a shared informer factory.
func (frw *frameworkImpl) SharedInformerFactory() informers.SharedInformerFactory {
	return frw.informerFactory
}

// Parallelism returns the amount of parallelism in algorithms for estimating.
func (frw *frameworkImpl) Parallelism() int {
	return frw.parallelism
}

// RunEstimateReplicasPlugins runs the set of configured EstimateReplicasPlugins
// for estimating replicas based on the given replicaRequirements.
// It returns an integer and an error.
// The integer represents the minimum calculated value of estimated replicas from each EstimateReplicasPlugin.
func (frw *frameworkImpl) RunEstimateReplicasPlugins(ctx context.Context, snapshot *schedcache.Snapshot, replicaRequirements *pb.ReplicaRequirements) (int32, *framework.Result) {
	startTime := time.Now()
	defer func() {
		// Emit metrics with both old and new labels for backward compatibility
		// TODO: Remove estimator label in a future release (deprecated)
		metrics.FrameworkExtensionPointDuration.WithLabelValues(estimator).Observe(utilmetrics.DurationInSeconds(startTime))
		metrics.FrameworkExtensionPointDuration.WithLabelValues(estimateReplicasExtension).Observe(utilmetrics.DurationInSeconds(startTime))
	}()
	var replica int32 = math.MaxInt32
	results := make(framework.PluginToResult)
	for _, pl := range frw.estimateReplicasPlugins {
		plReplica, ret := frw.runEstimateReplicasPlugins(ctx, pl, snapshot, replicaRequirements)
		if (ret.IsSuccess() || ret.IsUnschedulable()) && plReplica < replica {
			replica = plReplica
		}
		if ret.IsFailed() {
			replica = 0
		}
		results[pl.Name()] = ret
	}
	return replica, results.Merge()
}

func (frw *frameworkImpl) runEstimateReplicasPlugins(
	ctx context.Context,
	pl framework.EstimateReplicasPlugin,
	snapshot *schedcache.Snapshot,
	replicaRequirements *pb.ReplicaRequirements,
) (int32, *framework.Result) {
	startTime := time.Now()
	replica, ret := pl.Estimate(ctx, snapshot, replicaRequirements)
	// Emit metrics with both old and new labels for backward compatibility
	// TODO: Remove estimator label in a future release (deprecated)
	metrics.PluginExecutionDuration.WithLabelValues(pl.Name(), estimator).Observe(utilmetrics.DurationInSeconds(startTime))
	metrics.PluginExecutionDuration.WithLabelValues(pl.Name(), estimateReplicasExtension).Observe(utilmetrics.DurationInSeconds(startTime))
	return replica, ret
}

// RunEstimateComponentsPlugins runs the set of configured EstimateComponentsPlugins
// for estimating the maximum number of complete component sets based on the given components.
// It returns an integer and a Result.
// The integer represents the minimum calculated value of estimated component sets from each EstimateComponentsPlugin.
func (frw *frameworkImpl) RunEstimateComponentsPlugins(ctx context.Context, snapshot *schedcache.Snapshot, components []pb.Component, namespace string) (int32, *framework.Result) {
	startTime := time.Now()
	defer func() {
		metrics.FrameworkExtensionPointDuration.WithLabelValues(estimateComponentsExtension).Observe(utilmetrics.DurationInSeconds(startTime))
	}()
	var sets int32 = math.MaxInt32
	results := make(framework.PluginToResult)
	for _, pl := range frw.estimateComponentsPlugins {
		plSets, ret := frw.runEstimateComponentsPlugins(ctx, pl, snapshot, components, namespace)
		if (ret.IsSuccess() || ret.IsUnschedulable()) && plSets < sets {
			sets = plSets
		}
		results[pl.Name()] = ret
	}
	return sets, results.Merge()
}

func (frw *frameworkImpl) runEstimateComponentsPlugins(ctx context.Context, pl framework.EstimateComponentsPlugin, snapshot *schedcache.Snapshot, components []pb.Component, namespace string) (int32, *framework.Result) {
	startTime := time.Now()
	sets, ret := pl.EstimateComponents(ctx, snapshot, components, namespace)
	metrics.PluginExecutionDuration.WithLabelValues(pl.Name(), estimateComponentsExtension).Observe(utilmetrics.DurationInSeconds(startTime))
	return sets, ret
}
